
Reduction applies an operator to a vector repeatedly to return a number.

[reduce(+, 1:8) sum(1:8)]

 36  36

[reduce(*, 1:8) prod(1:8)]

 40320  40320

@show reduce(boring, 1:8)
@show reduce(boring2, 1:8)

reduce(boring,1:8) => 1
reduce(boring2,1:8) => 8

You can also use reduce to compute Fibonacci numbers using their recurrences.

$$\begin{pmatrix} f_2 \\ f_1 \end{pmatrix} = \begin{pmatrix} f_1 + f_0 \\ f_1 \end{pmatrix} = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix} \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$$$\begin{pmatrix} f_{n+1} \\ f_n \end{pmatrix} = \dots = \begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n \begin{pmatrix} f_1 \\ f_0 \end{pmatrix} $$

From this, you can show that

$$\begin{pmatrix} 1 & 1 \\ 1 & 0 \end{pmatrix}^n = \begin{pmatrix} f_{n+1} & f_n \\ f_n & f_{n-1} \end{pmatrix} $$

fib(j)=reduce(*, [[1 1; 1 0] for i=1:j])
map(fib, [4, 7])

 2x2 Array{Int64,2}:
 5  3
 3  2    
 2x2 Array{Int64,2}:
 21  13
 13   8

You can solve recurrences of any complexity using reduce. For example, reduce can compute a Hadamard matrix from its definition in terms of its submatrices:

$$H_2 = \begin{pmatrix} H_1 & H_1 \\ H_1 & -H_1 \end{pmatrix} = \begin{pmatrix} 1 & 1 \\ 1 & -1 \end{pmatrix} \otimes H_1$$

and so on.

Hadamard(n)=reduce(kron, [[1 1; 1 -1] for i=1:n])

 1   1   1   1   1   1   1   1
 1  -1   1  -1   1  -1   1  -1
 1   1  -1  -1   1   1  -1  -1
 1  -1  -1   1   1  -1  -1   1
 1   1   1   1  -1  -1  -1  -1
 1  -1   1  -1  -1   1  -1   1
 1   1  -1  -1  -1  -1   1   1
 1  -1  -1   1  -1   1   1  -1

ans*ans' #This is a legitimate Hadamard matrix

 8  0  0  0  0  0  0  0
 0  8  0  0  0  0  0  0
 0  0  8  0  0  0  0  0
 0  0  0  8  0  0  0  0
 0  0  0  0  8  0  0  0
 0  0  0  0  0  8  0  0
 0  0  0  0  0  0  8  0
 0  0  0  0  0  0  0  8

#Reduction of matrix multiplications
M=[randn(2,2) for i=1:4]
printnice(reduce((A,B)->B*A, M)) #backward multiply
printnice(reduce(*, M))          #forward multiply

-7.087	4.835
2.571	-1.748

-7.087	4.835
2.571	-1.748

.032	-.079
-.892	.958

In the following example we apply reduce() to function composition:

h=reduce((f,g)->(x->f(g(x))), [sin cos tan])
[h(pi) sin(cos(tan(pi)))]

 0.841471  0.841471


Having discussed reduce, we are now ready for the idea behind prefix sum.

Suppose you wanted to compute the partial sums of a vector, i.e. given y[1:n], we want to overwrite the vector y with the vector of partial sums

new_y[1] = y[1]
new_y[2] = y[1] + y[2]
new_y[3] = y[1] + y[2] + y[3]

At first blush, it seems impossible to parallelize this, since

new_y[1] = y[1]
new_y[2] = new_y[1] + y[2]
new_y[3] = new_y[2] + y[3]

which is an intrinsically serial process.

function prefix_serial!(y,+)
    @inbounds for i=2:length(y)

prefix_serial! (generic function with 1 method)

However, it turns out that because addition (+) is associative, we can regroup the order of how these sums are carried out. (This of course extends to other associative operations such as multiplication.) Another ordering of 8 associative operations is provided by prefix8!:

function prefix8!(y, *)
    length(y)==8 || error("length 8 only")
    for i in [2,4,6,8]; y[i]=y[i-1]*y[i]; end
    for i in [  4,  8]; y[i]=y[i-2]*y[i]; end
    for i in [      8]; y[i]=y[i-4]*y[i]; end
    for i in [    6  ]; y[i]=y[i-2]*y[i]; end
    for i in [ 3,5,7 ]; y[i]=y[i-1]*y[i]; end

function prefix!(y,.*)
    @inbounds for j=1:k, i=2^j:2^j:min(l, 2^k)              #"reduce"
    @inbounds for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(l, 2^k) #"broadcast"

prefix! (generic function with 1 method)

prefix! rearranges the operations to form two spanning trees:

In [11]:
using Compose
function gate(proc_from, proc_to, depth, num_procs;
              radius_in=0.1/num_procs, radius_gate=0.25/num_procs)
    const k::Int=ceil(log2(num_procs))              #Height of tree
    in1 = (proc_from-0.5)/num_procs, depth/(2k)     #Coordinates of input gate 1
    in2 = (proc_to-0.5)/num_procs, depth/(2k)       #Coordinates of input gate 2
    op  = (proc_to-0.5)/num_procs, (depth+0.5)/(2k) #Coordinates of gate operation
        compose(canvas(), circle(in1..., radius_in), circle(in2..., radius_in),
                          circle(op..., radius_gate), fill("white")),
        compose(canvas(), lines(in1, op), lines(in2, op), linewidth(0.3mm)))

function drawcircuit(num_procs::Int)
    const k::Int=ceil(log2(num_procs))                    #Height of tree
    for j=1:k, i=2^j:2^j:min(num_procs, 2^k)              #Draw "reduce" tree
        push!(circuit, gate(i-2^(j-1), i,    j, num_procs))
    for j=(k-1):-1:1, i=3*2^(j-1):2^j:min(num_procs, 2^k) #Draw "broadcast" tree
        push!(circuit, gate(i-2^(j-1), i, 2k-j, num_procs))
    push!(circuit, compose(canvas(), #Draw vertical lines for each processor
      [lines(((i-0.5)/num_procs,0),((i-0.5)/num_procs,1)) for i=1:num_procs]...,
      linewidth(0.1mm), stroke("grey")))
    compose(canvas(), circuit...)
@time drawcircuit(8)

Now let's run prefix in parallel on every core on the CPU. Use addprocs to attach additional worker processes.

@show Sys.CPU_CORES
@show nprocs() #Current number of processes
@show procs()  #List of processes by index

#Add a specified number of processors
num_procs = 8#Sys.CPU_CORES
addprocs(max(0, num_procs-nprocs()))

@show nprocs() #Current number of processes
@show procs()  #List of processes by index

import Base: fetch, length
fetch(t::Vector) = map(fetch, t) #Vectorize fetch

#Define elementary operations on remote data
+(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)
*(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)*fetch(r2)

Sys.CPU_CORES => 80
nprocs() => 1
procs() => [1]
nprocs() => 8
procs() => [1,2,3,4,5,6,7,8]
* (generic function with 133 methods)

#This line of code corresponds to one element of the diagram
# +(r1::RemoteRef,r2::RemoteRef)=@spawnat r2.where fetch(r1)+fetch(r2)


For 8 processes, the serial version requires 7 operations. The parallel version uses 11 operations, but they are grouped into 5 simultaneous chunks of operations. Hence we expect that the parallel version runs in 5/7 the time needed for the naïve serial code.

#Before timing, force Julia to JIT compile the functions
for f in (prefix_serial!, prefix!)
    f([randn(3,3) for i=1:2], *)

@time r=RemoteRef[@spawnat p randn(n,n) for p in procs()] #Create RemoteRefs
s=fetch(r) #These are in-memory matrices
t=copy(r)  #These are RemoteRefs

tic(); prefix_serial!(s, *); t_ser = toc()
tic(); @sync prefix!(t, *); t_par = toc() #Caution: race condition bug #4330
@printf("Serial: %.3f sec  Parallel: %.3f sec  speedup: %.3fx (theory=1.4x)", t_ser, t_par, t_ser/t_par)

elapsed time: 0.13184095 seconds (33593504 bytes allocated)
elapsed time: 15.151019988 seconds
elapsed time: 11.768449118 seconds
Serial: 15.151 sec  Parallel: 11.768 sec  speedup: 1.287x (theory=1.4x)

The exact same serial code now runs in parallel thanks to multiple dispatch!

#Which method is used with these arguments?
@show typeof(s[1])
which(*, s[1:2]...)
@show typeof(t[1])
which(*, t[1:2]...)

typeof(s[1]) => Array{Float64,2}
*{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32})}(A::Union(SubArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2,A<:DenseArray{T,N},I<:(Union(Range{Int64},Int64,Range1{Int64})...,)},DenseArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2}),B::Union(SubArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2,A<:DenseArray{T,N},I<:(Union(Range{Int64},Int64,Range1{Int64})...,)},DenseArray{T<:Union(Float32,Float64,Complex{Float64},Complex{Float32}),2})) at linalg/matmul.jl:92
typeof(t[1]) => RemoteRef
*(r1::RemoteRef,r2::RemoteRef) at In[12]:18

In [ ]: